【Dubbo】Dubbo SPI机制

Dubbo SPI

在Dubbo的源码中,可以看到很多地方使用了ExtensionLoader来获取具体的扩展类,以Protocol为例,Protocol是一个接口,它可以有多种协议,那么具体选择哪一种协议呢,就是通过ExtensionLoader的getExtensionLoader方法获取Protocol对应的ExtensionLoader对象,然后调用其getAdaptiveExtension方法获取具体的实现类的,它可以根据配置选择不同的协议:

1
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

可以看到Protocol使用到了@SPI注解,默认使用dubbo协议:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SPI("dubbo")
public interface Protocol {

int getDefaultPort();

@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

void destroy();

default List<ProtocolServer> getServers() {
return Collections.emptyList();
}

}

在META-INF/dubbo下面可以看到dubbo对应的类为org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol:

1
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

因为META-INF下dubbo对应的扩展类为DubboProtocol,所以通过ExtensionLoader获取Protocol具体的实现类时,如果没有指定协议,将默认使用DubboProtocol。

Dubbo还使用了自适应扩展机制,也就是@Adaptive,它可以加载类上,也可以加在某个方法上,加在某个方法中时,会动态生成字节码创建自适应对象Protocol$Adaptive,Protocol$Adaptive中会根据url中的协议选择不同的实现类。接下来就进入ExtensionLoader中看一下SPI机制的实现原理。

ExtensionLoader

getExtensionLoader方法

EXTENSION_LOADERS:缓存每个class对象对应的ExtensionLoader,dubbo会为每个class都创建一个ExtensionLoader对象。

getExtensionLoader方法用来根据class对象获取对应的ExtensionLoader,就是从EXTENSION_LOADERS获取的,如果为空,会创建一个ExtensionLoader对象并放入EXTENSION_LOADERS中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ExtensionLoader<T> {

// 一个map,key为class类型,value为对应的ExtensionLoader对象
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>(64);

// 根据class类型获取ExtensionLoader
@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null) {
throw new IllegalArgumentException("Extension type == null");
}
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type (" + type +
") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
}
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
// 创建一个ExtensionLoader对象并放入EXTENSION_LOADERS中
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
// 返回ExtensionLoader对象
return loader;
}
}

ExtensionLoader中还有一个getExtension方法,它可以根据名称来获取对应的实现类,比如自定义了一个Protocol的实现类MyDubboProtocol:

1
2
3
public class MyDubboProtocol implements Protocol {

}

在META-INF/dubbo下面配置一个自定义的协议,key为mydubbo,value为MyDubboProtocol全限定类名:

1
mydubbo=org.apache.dubbo.rpc.protocol.dubbo.MyDubboProtocol

接下来通过ExtensionLoader的getExtension方法中传入mydubbo就可以获取到MyDubboProtocol:

1
2
// 此时获取的Protocol类型就是MyDubboProtocol
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("mydubbo");

getExtension方法

getExtension方法,会根据传入的名称获取对应的实例化对象,如果对象为空,会调用createExtension方法创建扩展对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 根据名称获取对象
@SuppressWarnings("unchecked")
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
return getDefaultExtension();
}
final Holder<Object> holder = getOrCreateHolder(name);
// 获取实例对象
Object instance = holder.get();
// 如果对象为空
if (instance == null) {
synchronized (holder) {
// 再次获取判断是否为空
instance = holder.get();
if (instance == null) {
// 创建createExtension对象
instance = createExtension(name);
// 放入holder
holder.set(instance);
}
}
}
return (T) instance;
}

createExtension

createExtension方法用于创建扩展对象:

  1. 它会调用getExtensionClasses获取所有的class
  2. 根据传入的名称name获取class对象
  3. 根据第2步获取的class对象,从EXTENSION_INSTANCES(缓存了每个class对应的实例化对象)中获取class对应的实例化对象,如果为空,通过newInstance实例化一个对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 缓存每个class对象对应的实例化对象
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>(64);

// 创建对象
private T createExtension(String name) {
// 调用getExtensionClasses,根据名称获取对应的class
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
// 根据class获取对应的实例
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 如果为空,创建一个对象
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 注入
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
// 初始化
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}

getExtensionClasses

getExtensionClasses用于获取所有的扩展类的class信息,如果为空调用loadExtensionClasses方法加载所有的class信息,并放入一个map中,将map封装为一个Holer对象,也就是cachedClasse:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();

// 获取所有的class
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 调用loadExtensionClasses加载所有的class
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}

比如dubbo协议,key为dubbo,value为org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol,在getExtensionClasses方法中就会将这些信息放入到cachedClasses中:

1
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

loadExtensionClasses

loadExtensionClasses加载所有的扩展类的class信息,它会遍历所有的加载策略,从对应的目录下加载class信息:

1
2
3
4
5
6
7
8
9
10
11
12
// 加载class
private Map<String, Class<?>> loadExtensionClasses() {
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
// 遍历所有的加载策略
for (LoadingStrategy strategy : strategies) {
// 从目录下加载class信息
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
}
return extensionClasses;
}
LoadingStrategy
1
2
3
4
5
6
7
8
// loadLoadingStrategies获取所有的加载策略
private static volatile LoadingStrategy[] strategies = loadLoadingStrategies();
// 获取所有的加载策略
private static LoadingStrategy[] loadLoadingStrategies() {
return stream(load(LoadingStrategy.class).spliterator(), false)
.sorted()
.toArray(LoadingStrategy[]::new);
}

LoadingStrategy是一个接口,它有四个实现类:

以DubboLoadingStrategy为例,可以看到directory的值为”META-INF/dubbo/“,所以loadExtensionClasses中会加载META-INF/dubbo/下的所有配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class DubboLoadingStrategy implements LoadingStrategy {

@Override
public String directory() {
return "META-INF/dubbo/";
}

@Override
public boolean overridden() {
return true;
}

@Override
public int getPriority() {
return NORMAL_PRIORITY;
}
}

Adaptive自适应扩展

在最开始,获取Protocol的时候调用的是getAdaptiveExtension方法,并且Protocol的export和refer上添加了@Adaptive注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

@SPI("dubbo")
public interface Protocol {

int getDefaultPort();

@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

void destroy();

default List<ProtocolServer> getServers() {
return Collections.emptyList();
}

}

getAdaptiveExtension

getAdaptiveExtension方法用来获取自适应扩展对象,如果为空,将会调用createAdaptiveExtension创建扩展对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final Holder<Object> cachedAdaptiveInstance = new Holder<>();

@SuppressWarnings("unchecked")
public T getAdaptiveExtension() {
// 从缓存中获取
Object instance = cachedAdaptiveInstance.get();
// 如果为空
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
// 加锁
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 创建自适应Extension对象
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}

return (T) instance;
}

createAdaptiveExtension

createAdaptiveExtension用于创建自适应扩展对象,它会通过AdaptiveClassCodeGenerator动态生成字节码,创建代理对象XXX$Adaptive:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 private T createAdaptiveExtension() {
try {
// 创建自适应Extension对象
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
// 获取自适应Extension class
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
// 创建自适应Extension Class
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

private Class<?> createAdaptiveExtensionClass() {
// 创建AdaptiveClassCodeGenerator自适应代码生成对象,动态生成字节码
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}

以Protocol为例,会动态生成Protocol$Adaptive类,在Protocol$Adaptive中实现了Protocol接口中的方法,以export为例,它根据url中的协议获取扩展名,再根据名称调用上面提到的getExtension方法获取具体的实现类,所以在这里才会生成DubboProtocol,之后调用DubboProtocol的export方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
// 根据url中的协议获取扩展名
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
// 根据扩展名获取扩展类
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}

public void destroy() {
throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}

public int getDefaultPort() {
throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}

public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}

public java.util.List getServers() {
throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
}

参考

Dubbo - Dubbo的SPI机制

【拉勾教育】Dubbo源码解读与实战

dubbo版本:2.7.7